Flink is a complicated framework and provides many ways to tweak its execution. In this article, I’ll show four different ways to improve the performance of your Flink applications.

If you are not familiar with Flink, you can read other introductory articles like this, this, and this one. But if you are already familiar with Apache Flink this article will help you to make your applications a little bit faster.

Use Flink tuples

When you use operations like groupBy, join or keyBy Flink provides you a number of options to select a key in your dataset. You can use a key selector function:

// Join movies and ratings datasets
movies.join(ratings)
        // Use movie id as a key in both cases
        .where(new KeySelector<Movie, String>() {
            @Override
            public String getKey(Movie m) throws Exception {
                return m.getId();
            }
        })
        .equalTo(new KeySelector<Rating, String>() {
            @Override
            public String getKey(Rating r) throws Exception {
                return r.getMovieId();
            }
        })

Or you can specify a field names in POJO types:

movies.join(ratings)
    // Use same fields as in the previous example
    .where("id")
    .equalTo("movieId")

But if you are working with Flink tuple types you can simply specify a position of a field tuple that will be used as key:

DataSet<Tuple2<String, String>> movies = ...
DataSet<Tuple3<String, String, Double>> ratings = ...

movies.join(ratings)
    // Specify fields positions in tuples
    .where(0)
    .equalTo(1)

The last option will give you the best performance, but what about readability? Does it mean that your code will look like this now:

DataSet<Tuple3<Integer, String, Double>> result = movies.join(ratings)
    .where(0)
    .equalTo(0)
    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
        // What is happening here?
        @Override
        public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
            // Some tuples are joined with some other tuples and some fields are returned???
            return new Tuple3<>(first.f0, first.f1, second.f1);
        }
    });

A common idiom to improve readability, in this case, is to create a class that inherits from one of the TupleX classes and implements getters and setters for these fields. Here how an Edge class from the Flink Gelly library that has three classes and extends the Tuple3 class:

public class Edge<K, V> extends Tuple3<K, K, V> {

    public Edge(K source, K target, V value) {
        this.f0 = source;
        this.f1 = target;
        this.f2 = value;
    }
    
    // Getters and setters for readability
    public void setSource(K source) {
        this.f0 = source;
    }

    public K getSource() {
        return this.f0;
    }
    
    // Also has getters and setters for other fields
    ...
}

Reuse Flink objects

Another option that you can use to improve the performance of your Flink application is to use mutable objects when you return data from a user-defined function. Take a look at this example:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            // A new Tuple instance is created on every execution
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    }

As you can see on every execution of the apply function, we create a new instance of the Tuple2 class, which increases pressure on a garbage collector. One way to fix this problem would be to reuse the same instance again and again:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create an instance that we will reuse on every call
        private Tuple2<String, Long> result = new Tuple<>();
    
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Auto-boxing!! A new Long value may be created
            result.f1 = changesCount;
            
            // Reuse the same Tuple2 object
            collector.collect(result);
        }
    }

It’s a bit better. We create a new Tuple2 instance on every call, but we still, indirectly, create an instance of the Long class. To solve this problem, Flink has a number of so-called value classes: IntValue, LongValue, StringValue, FloatValue, etc. The point of this classes is to provide mutable versions of built-in types, so we could reuse them in our user-defined functions. Here is how we can use them:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create a mutable count instance
        private LongValue count = new IntValue();
        // Assign mutable count to the tuple
        private Tuple2<String, LongValue> result = new Tuple<>("", count);
    
        @Override
        // Notice that now we have a different return type
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Update mutable count value
            count.setValue(changesCount);
            
            // Reuse the same tuple and the same LongValue instance
            collector.collect(result);
        }
    }

This idiom is commonly used in Flink libraries like Flink Gelly.

Use function annotations

One more way to optimize your Flink application is to provide some information about what your user-defined functions are doing with input data. Since Flink can’t parse and understand code, you can provide crucial information that will help to build a more efficient execution plan. There are three annotations that we can use:

  • @ForwardedFields – specifies what fields in an input value were left unchanged and are used in an output value.
  • @NotForwardedFields – specifies fields which were not preserved in the same positions in the output.
  • @ReadFields – specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.

Let’s take a look at how we can use ForwardedFields annotation:

// Specify that the first element is copied without any changes
@ForwardedFields("0")
class MyFunction implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
    @Override
    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
       // Copy first field without change
        return new Tuple2<>(value.f0, value.f1 + 123);
    }
}

This means that the first element in an input tuple is not being changed and it is returned in the same position.

If you don’t change a field, but simply move it into a different position, you can specify this with the ForwardedFields annotation as well. In the next example we swap fields in an input tuple and warn Flink about this:

// 1st element goes into the 2nd position, and 2nd element goes into the 1st position
@ForwardedFields("0->1; 1->0")
class SwapArguments implements MapFunction<Tuple2<Long, Double>, Tuple2<Double, Long>> {
    @Override
    public Tuple2<Double, Long> map(Tuple2<Long, Double> value) {
       // Swap elements in a tuple
        return new Tuple2<>(value.f1, value.f0);
    }
}

The annotations mentioned above can only be applied to functions that have one input parameter, such as map or flatMap. If you have two input parameters, you can use the ForwardedFieldsFirst and the ForwardedFieldsSecond annotations that provide information about the first and the second parameters respectively.

Here how we can use these annotations in an implementation of the JoinFunction interface:

// Two fields from the input tuple are copied to the first and second positions of the output tuple
@ForwardedFieldsFirst("0; 1")
// The third field from the input tuple is copied to the third position of the output tuple
@ForwardedFieldsSecond("2")
class MyJoin implements JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
    @Override
    public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
        return new Tuple3<>(first.f0, first.f1, second.f1);
    }
})

Flink also provides NotForwardedFieldsFirst, NotForwardedFieldsSecond, ReadFieldsFirst, and ReadFirldsSecond annotations for similar purposes.

Select join type

You can make your joins faster if you give Flink another hint, but before we discuss why it works, let’s talk about how Flink executes joins.

When Flink is processing batch data, each machine in a cluster stores part of data. To perform a join Apache Flink needs to find all pairs of two datasets where a join condition is satisfied. To do this Flink first has to put items from both datasets that have the same key on the same machine in the cluster. There are two strategies for this:

  • Repartition-Repartition strategy – in this case, both datasets are partitioned by their keys and send across the network. It means that if datasets are big, it may take a significant amount of time to copy them across the network.
  • Broadcast-Forward strategy – in this case, one dataset is left untouched, but the second dataset is copied to every machine in the cluster that has part of the first dataset.

If you are joining a small dataset with a much bigger dataset, you can use the Broadcast-Forward strategy and avoid costly partition of the first dataset. This is really easy to do:

ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)

This hints that the first dataset is a much smaller than the second one.

You can also use other join hints:

  • BROADCAST_HASH_SECOND – the second dataset is much smaller
  • REPARTITION_HASH_FIRST – the first dataset it a bit smaller
  • REPARTITION_HASH_SECOND – the second dataset is a bit smaller
  • REPARTITION_SORT_MERGE – repartition both datasets and use sorting and merging strategy
  • OPTIMIZER_CHOOSES – Flink optimizer will decide how to join datasets

You can read more about how Flink performs joins in this article.

More information

I hope you liked this article and found it useful.

I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.

Posted by Ivan Mushketyk

Principal Software engineer and life-long learner. Creating courses for Pluralsight. Writing for DZone, SitePoint, and SimpleProgrammer.